package com.jokerku.mini.schedule.sharding;

import com.alibaba.fastjson.JSON;
import com.jokerku.mini.schedule.common.Constants;
import com.jokerku.mini.schedule.domain.ExecOrder;
import com.jokerku.mini.schedule.domain.ZkPath;
import com.jokerku.mini.schedule.service.ZkCuratorServer;
import com.jokerku.mini.schedule.util.SleepUtil;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: guzq
 * @CreateTime: 2023/06/20 10:51
 * @Description: 分片服务
 * @Version: 1.0
 */
public class ShardingService {

    private static final Logger logger = LoggerFactory.getLogger(ShardingService.class);

    private static final ShardingService INSTANCE = new ShardingService();

    private ShardingService() {
    }

    public static ShardingService getInstance() {
        return INSTANCE;
    }

    /**
     * 重置分片状态
     */
    public void setReShardingFlag() {
        if (!Constants.Global.leaderService.isLeader()) {
            return;
        }
        try {
            boolean necessaryExists = ZkCuratorServer.isExisted(Constants.Global.client, ZkPath.getShardingNecessaryPath());
            if (!necessaryExists) {
                ZkCuratorServer.create(Constants.Global.client, ZkPath.getShardingNecessaryPath());
            }
        } catch (Exception ignore) {
        }
    }

    /**
     * 判断是否需要分片
     *
     * @return true:需要,反之 false
     */
    public boolean isNeedSharding() {
        return !getAllRunningIp().isEmpty()
                && ZkCuratorServer.isExisted(Constants.Global.client, ZkPath.getShardingNecessaryPath());
    }

    /**
     * 分片
     */
    public void shardingIfNecessary() {
        if (!isNeedSharding()) {
            return;
        }
        if (!Constants.Global.leaderService.isLeader()) {
            waitShardingCompleted();
            return;
        }

        doSharding();
    }


    private void doSharding() {
        InterProcessMutex lock = ZkCuratorServer.getLock(Constants.Global.client, ZkPath.getShardingLockPath());
        try {
            lock.acquire();
            if (!isNeedSharding()) {
                return;
            }
            logger.info("Start sharding");
            beginSharding();
            logger.info("End sharding");
        } catch (Exception e) {
            logger.error("Mini schedule sharding error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception ignore) {
            }
        }
    }

    /**
     * 开始处理分片
     * @throws Exception
     */
    private void beginSharding() throws Exception {
        boolean shardingSuccess = false;
        try {
            ZkCuratorServer.create(Constants.Global.client, ZkPath.getShardingProcessingPath());
            reSetShardingInfo();
            for (Map.Entry<String, List<ExecOrder>> entry : Constants.EXEC_ORDER_MAP.entrySet()) {
                for (ExecOrder execOrder : entry.getValue()) {
                    if (!execOrder.getAutoStartup()) {
                        continue;
                    }
                    // TODO 这里存在bug， 当新的一台机器启动时带有新的任务 由于主节点不存在该任务 或 autoStartup = false 导致新任务不会被分配
                    ShardingStrategy shardingStrategy = ShardingStrategyFactory.getShardingStrategy("AVERAGE_ALLOCATION");
                    Map<String, List<Integer>> shardingResultMap = shardingStrategy.sharding(getAllRunningIp(), execOrder.getShardingTotalCount());
                    logger.info("Sharding success, taskId: {}--{}", execOrder.getTaskId(), shardingResultMap);
                    setShardingResult(execOrder, shardingResultMap);
                }
            }
            shardingSuccess = true;
        } finally {
            afterSharding(shardingSuccess);
        }
    }

    /**
     * 重置任务分片信息
     */
    private void reSetShardingInfo() {
        for (Map.Entry<String, List<ExecOrder>> entry : Constants.EXEC_ORDER_MAP.entrySet()) {
            for (ExecOrder execOrder : entry.getValue()) {
                try {
                    String runningTaskPath = ZkPath.getRunningTaskPath(execOrder.getTaskId());
                    ZkCuratorServer.deletingChildrenIfNeeded(Constants.Global.client, runningTaskPath);
                    logger.info("Delete history sharding result, path:{}", runningTaskPath);
                } catch (Exception ignore) {
                }
            }
        }
    }

    /**
     * 将分片结果保证至 zk 中
     *
     * @param execOrder         任务
     * @param shardingResultMap 分片结果
     * @throws Exception
     */
    private void setShardingResult(ExecOrder execOrder, Map<String, List<Integer>> shardingResultMap) throws Exception {
        String runningTaskPath = ZkPath.getRunningTaskPath(execOrder.getTaskId());
        if (!ZkCuratorServer.isExisted(Constants.Global.client, runningTaskPath)) {
            ZkCuratorServer.create(Constants.Global.client, runningTaskPath);
        }
        for (Map.Entry<String, List<Integer>> entry : shardingResultMap.entrySet()) {
            String ip = entry.getKey();
            List<Integer> shardingItems = entry.getValue();
            String runningTaskIpPath = ZkPath.getRunningTaskIpPath(execOrder.getTaskId(), ip);
            logger.info("Set Sharding result to Zk, path: {}, result:{}", runningTaskIpPath, shardingItems);
            // 设置分片结果
            if (shardingItems.isEmpty()) {
                continue;
            }
            ZkCuratorServer.createEphemeralNode(Constants.Global.client, runningTaskIpPath, JSON.toJSONString(shardingItems));
        }
    }

    /**
     * 分片完成后置处理
     *
     * @param shardingSuccess 是否分片成功 true:成功, false:失败
     */
    private void afterSharding(boolean shardingSuccess) {
        try {
            ZkCuratorServer.deletingChildrenIfNeeded(Constants.Global.client, ZkPath.getShardingProcessingPath());
            if (shardingSuccess) {
                ZkCuratorServer.deletingChildrenIfNeeded(Constants.Global.client, ZkPath.getShardingNecessaryPath());
            }
        } catch (Exception e) {
            logger.error("Mini schedule sharding delete path error", e);
        }
    }

    /**
     * 等待主节点分片完成
     */
    private void waitShardingCompleted() {
        while (!Constants.Global.leaderService.isLeader() &&
                (ZkCuratorServer.isExisted(Constants.Global.client, ZkPath.getShardingNecessaryPath())
                        || ZkCuratorServer.isExisted(Constants.Global.client, ZkPath.getShardingProcessingPath()))) {
            logger.info("Waiting leader sharding complete");
            SleepUtil.sleep();
        }
    }

    /**
     * 获取所有运行中的实例 ip
     *
     * @return ip list
     */
    private List<String> getAllRunningIp() {
        try {
            String pathRootServerIp = ZkPath.buildServerRunningIpPath(Constants.Global.scheduleServerId);
            return Constants.Global.client.getChildren().forPath(pathRootServerIp);
        } catch (Exception e) {
            logger.error("Get all running ip error", e);
        }
        return Collections.emptyList();
    }
}
